e7240e207db697fd0f82de28fb95351acb8e5daa,spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java,ConcurrentMessageListenerContainerTests,testAutoCommitWithRebalanceListener,#,136
Before Change
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final List<String> listenerThreadNames = new ArrayList<>();
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
}
});
final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
After Change
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {